【アップデート】Lambdaのイベントソースマッピングでイベントのフィルタが可能になりました
MAD事業部@大阪の岩田です。先日のアップデートによってLambdaのイベントソースマッピングでイベントのフィルタが可能になりました。
現在フィルタがサポートされるイベントソースは
- Kinesis Data Streams
- DynamoDB(DynamoDB Streams)
- SQS
の3種です。MQやMSKはサポートされません。
何がうれしいのか
これまではイベントソースに流れてきた全てのレコードがLambdaのイベントデータに引き渡されていましたが、ユースケースによっては必ずしも全てのレコードに対してロジックを実行する必要がありません。私は以前IoTシステムのバックエンドを構築した際に「インテリジェントシャドウパターン」のような設計を採用したことがあるのですが、DynamoDB StreamsからLambdaを起動して後続処理を流したいのはDynamoDBにINSERTされた場合のみでした。しかしDynamoDB StreamsにはMODIFYやREMOVEのイベントも流れてくるのでLambda側のロジックでイベント種別を判定してMODIFYやREMOVEのイベントは処理をスキップする処理を実装していました。
図は最新 IoT デザインパターン 〜AWS IoT と AWS Greengrass を用いた構築パターン〜より引用
今回のアップデートを利用することでLambda側での判定処理が不要になるので
- Lambdaのコードの簡素化
- 不要なLambda実行に伴うコストの削減
などが期待できます。
フィルタの概要
イベントソースマッピングに対してフィルタリング条件を指定するとLambdaに引き渡すイベントデータをフィルタできます。フィルタリングは条件は以下のような構造で定義されます。
{ "Filters": [ { "Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}" }, { "Pattern": ...略 } ] }
Pattern
の部分で定義されるフィルタは最大5つまで指定でき、上限緩和申請を行うことで最大10個まで指定できるようになります。フィルタを複数指定した場合、イベントソースに流れてきたレコードがいずれかのフィルタにマッチするとLambdaのイベントデータに引き渡されます。
フィルタはJSON形式の文字列で、JSON形式に展開すると以下のような構造です
{ "Metadata1": [ pattern1 ], "data": { "Data1": [ pattern2 ] } }
フィルタは以下3つの要素で構成されます。
- メタデータプロパティ
- イベントデータのメタデータ
- 上記サンプルの
Metadata1
部分 - 例えばKinesis Data Streamsがイベントソースの場合は
kinesisSchemaVersion
やpartitionKey
が、DynamoDB Streamsがイベントソースの場合はeventName
がメタデータプロパティにあたります
- データプロパティ
- 上記サンプルの
Data1
部分 - イベントデータのボディの各プロパティに相当します
- データプロパティの親にあたるプロパティはイベントソースによって異なります
- Kinesis Data Streamsの
data
、SQSの場合はbody
、DynamoDB Streamsの場合はdynamodb
です
- 上記サンプルの
- フィルタルール
- メタデータプロパティもしくはデータプロパティに適用するフィルタを定義します。
フィルタルールには以下の比較演算子がサポートされています
比較演算子 | 比較したい内容 | フィルタルールの記述 |
---|---|---|
Null | UserIDがNULLか? | "UserID": [ null ] |
Empty | LastNameが空文字列か? | "LastName": [""] |
Equals | Nameが"Alice"か? | "Name": [ "Alice" ] |
And | Locationが "New York" かつDay が "Monday"か? | "Location": [ "New York" ], "Day": ["Monday"] |
Or | PaymentType が "Credit" もしくは "Debit"か? | "PaymentType": [ "Credit", "Debit"] |
Not | Weather が "Raining" でないか? | "Weather": [ { "anything-but": [ "Raining" ] } ] |
Numeric (equals) | Price が 100か? | "Price": [ { "numeric": [ "=", 100 ] } ] |
Numeric (range) | Price が 10より超過 ~ 20未満か? | "Price": [ { "numeric": [ ">", 10, "<=", 20 ] } ] |
Exists | プロパティProductName が存在するか? | "ProductName": [ { "exists": true } ] |
Does not exist | プロパティProductName が存在しないか? | "ProductName": [ { "exists": false } ] |
Begins with | Region が us-から始まるか? | "Region": [ {"prefix": "us-" } ] |
イベントのフィルタは以下のようなフローで評価されます。
図はFiltering event sources for AWS Lambda functionsより引用
フィルタ条件を満たしたレコードのみがLambdaで処理するバッチに入るので、フィルタ条件を満たさないレコードがバッチサイズを無駄に消費することはありません。
やってみる
簡単にイベントソースのフィルタを試してみます。以前自前実装していた、DynamoDB Streamsに流れてきたレコードのうち、INSERTのみを処理するフィルタを試してみます。
まずは適当なDynamoDBのテーブルを作成し、ストリームを有効化します。
パーティションキー:idのみ定義したシンプルなテーブルです。テーブル名はfilter-test
としています
続いてLambdaを作成します。イベントデータの中身を1レコードづつログに出力するシンプルなLambdaです
def lambda_handler(event, context): for rec in event['Records']: print(rec)
Lambdaのトリガーとして先程のテーブルのストリームを指定します。
フィルタリング条件には
{ "filters": [ { "pattern": "{\"eventName\": [\"INSERT\"]}" } ] }
を指定しています。pattern
の直下にはメタデータプロパティであるeventName
を指定し、比較演算子には["INSERT"]
を指定しています。これでDynamoDBにアイテムが新規登録された場合だけLambdaが起動するようになります。
準備ができたのでDynamoDBにCRUD操作を行いつつ、CloudWatch LogsからLambdaのログを確認してみましょう
まずは新規追加
$aws dynamodb put-item --table-name filter-test --item '{"id":{"S":"1"}}'
追加したアイテムの更新
$aws dynamodb update-item --table-name filter-test --key '{"id":{"S":"1"}}' --update-expression 'SET key1=:val1' --expression-attribute-values '{":val1":{"S":"val1"}}'
追加したアイテムの削除
$aws dynamodb delete-item --table-name filter-test --key '{"id":{"S":"1"}}'
CloudWatch Logsを確認するとアイテムの新規追加時のみLambdaが起動していることが分かります
まとめ
ユースケース次第では非常に嬉しいアップデートではないでしょうか?これもいわゆるre:Invent予選落ちというやつでしょうかね。re:Inventの直前にLambdaの小型アップデートがいくつか発表されつつ、re:Inventで大きなアップデートが発表されるというのが恒例行事なので、次週からも色々なアップデートに期待していきたいです。